import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.math.Ordering.Int



object FlinkKafkaDDLDemo
{

  def main(args: Array[String]): Unit =
  {

//    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//
//    env.setParallelism(3)



//    流环境
    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

//    表环境
    val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    val createTable =
      """
              CREATE TABLE PERSON (

              |    name VARCHAR COMMENT '姓名',

              |    age VARCHAR COMMENT '年龄',

              |    city VARCHAR COMMENT '所在城市',

              |    address VARCHAR COMMENT '家庭住址',

              |    ts BIGINT COMMENT '时间戳',

              |    pay_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义事件时间

              |    WATERMARK FOR pay_time AS pay_time - INTERVAL '0' SECOND

              )

              |WITH (

              |    'connector.type' = 'kafka', -- 使用 kafka connector

              |    'connector.version' = 'universal',  -- kafka 版本

              |    'connector.topic' = 'kafka_ddl',  -- kafka topic

              |    'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取

              |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息

              |    'connector.properties.0.value' = 'Desktop:2181',

              |    'connector.properties.1.key' = 'bootstrap.servers',

              |    'connector.properties.1.value' = 'Desktop:9091',

              |    'update-mode' = 'append',

              |    'format.type' = 'json',  -- 数据源格式为 json

              |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则

              |)

            """.stripMargin


    System.out.println(createTable)

    tEnv.executeSql(createTable)



    val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY name""".stripMargin



    val result: Table = tEnv.sqlQuery(query)

    tEnv.toRetractStream[Row](result).print()
      bsEnv.execute("Flink SQL DDL")

  }

}

//根据下面的说法，上述代码似乎已经废弃了 。
//https://blog.csdn.net/weixin_41608066/article/details/107769826